package com.cantor.consumer.proxy.impl;

import cn.hutool.core.collection.ConcurrentHashSet;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.lang.Console;
import cn.hutool.core.lang.WeightRandom;
import cn.hutool.core.net.url.UrlBuilder;
import cn.hutool.core.net.url.UrlQuery;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.cantor.common.exception.ServiceInvokeException;
import com.cantor.common.util.CantorUtil;
import com.cantor.consumer.loadbalance.LoadBalance;
import com.cantor.consumer.pojo.ServiceRef;
import com.cantor.consumer.start.ConsumerNettyKeeper;
import com.cantor.core.message.CantorRequestMessage;
import com.cantor.core.pool.CantorExecutorPool;
import com.google.common.base.Defaults;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

@Slf4j
public class DefaultInvocationHandler implements InvocationHandler {

    // 这个Set负责记录拥有mock="fail:return null"且失败过1次的的节点, 被记录后, 后续永久返回null
    private static final Set<String> mockSet = new ConcurrentHashSet<>();

    // 对这个ServiceRef进行执行(构造方法要传)
    private ServiceRef serviceRef;

    // 带版本服务名
    private String serviceNameWithVersion;

    public DefaultInvocationHandler(ServiceRef serviceRef) {
        this.serviceRef = serviceRef;
        this.serviceNameWithVersion = computeServiceNameWithVersion();
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) {

        // 准备加权对象搭配负载均衡得到要发送的节点
        List<WeightRandom.WeightObj<String>> weightObjs = mapToWeightObjs();

        // 准备负载策略
        LoadBalance lb = LoadBalance.getStrategy(serviceRef.getLoadBalance());

        // 使用负载
        String selectedNode = lb.select(weightObjs);

        try {
            // 执行远程调用(通知Netty, 阻塞Future...)
            Object finalResult = doRemoteCall(selectedNode, method, args);
            return finalResult;
        } catch (Exception e) {
            e.printStackTrace();
            log.error("服务{}.{}首次调用失败, 原因:{}", serviceNameWithVersion, method.getName(), ExceptionUtil.getSimpleMessage(e));
            // 失败后,如果有mock="fail:return null", 将这个服务加入mockSet
            UrlQuery query = CantorUtil.getUrlBuilder(selectedNode).getQuery();
            String nodeMock = (String) query.get("mock"); // 节点上的mock参数
            String mock = StrUtil.isEmpty(nodeMock) ? serviceRef.getMock() : nodeMock;
            // 如果配置了服务熔断
            if ("fail:return null".equals(mock)) {
                mockSet.add(selectedNode);
                return Defaults.defaultValue(method.getReturnType()); // 返回方法类型对应默认值.
            }
            // 没配置熔断且不允许重试,则直接抛出异常,阻止用户业务继续执行
            else if (serviceRef.getRetries() == 0) {
                throw new ServiceInvokeException(e.getMessage());
            }
            // 否则才开始重试
            else {
                try {
                    Object finalRetryResult = doRetry(method, args);
                    return finalRetryResult;
                } catch (Exception e2) {
                    e2.printStackTrace();
                    log.error("doRetry方法出现异常");
                    return null;
                }
            }
        }
    }

    ///////////////////  方法  ///////////////

    // 开始做重试
    private Object doRetry(Method method, Object[] args) {

        // 准备负载要用的WeightObjs
        List<WeightRandom.WeightObj<String>> weightObjs = mapToWeightObjs();

        // 得到负载均衡策略
        LoadBalance lb = LoadBalance.getStrategy(serviceRef.getLoadBalance());

        // 使用负载拿到节点们(第二个参数指定要拿几个节点)
        Collection<String> selectNodes = lb.select(weightObjs, serviceRef.getRetries());

        // 构建n个任务同时运行(下面开始并行重试, 后期需要加入同步重试)
        CompletableFuture<Object> future = new CompletableFuture<>();
        CountDownLatch latch = new CountDownLatch(selectNodes.size()); // 记录重试任务的完成情况
        selectNodes.forEach(node -> {
            CantorExecutorPool.execute(() -> {
                try {
                    Object finalResult = doRemoteCall(node, method, args);
                    future.complete(finalResult);
                    log.debug("{}.{}(...)重试成功!!!", serviceNameWithVersion, method.getName());
                } catch (Exception e) {
                    log.error("{}.{}(...)重试失败一次, 原因{}", serviceNameWithVersion, method.getName(), e);
                } finally {
                    // 不管成功失败,这一次重试算是执行完了,countDown一下
                    latch.countDown();
                }
            });
        });
        // 当所有重试执行完, 或者某一个重试complete了, 才继续往下执行
        CompletableFuture<Boolean> futureFinishOrRetriesDown = new CompletableFuture<>();
        // 等complete
        CantorExecutorPool.execute(() -> {
            future.join();
            futureFinishOrRetriesDown.complete(true);
        });
        // 等重试结束
        CantorExecutorPool.execute(() -> {
            try {
                latch.await();
                future.complete(null); // 强制结束这个future,值为null
                futureFinishOrRetriesDown.complete(true);
                log.error("{}.{}(...)共{}次重试均失败.", serviceNameWithVersion, method.getName(), selectNodes.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        // 等complete或者latch任意一个完毕.
        futureFinishOrRetriesDown.join();
        // 如果complete结束就是正确值; 如果是重试结束导致结束,latch负责将future.complete(null)
        return future.getNow(null);
    }

    /**
     * 一些代码复用方法, 优化代码可读性和可维护性
     */
    // 获取带版本后缀的服务名(版本和serviceName拼接)
    private String computeServiceNameWithVersion() {
        String serviceNameWithVersion = serviceRef.getService().getName();
        if (StrUtil.isNotEmpty(serviceRef.getVersion())) {
            serviceNameWithVersion += ":" + serviceRef.getVersion();
        }
        return serviceNameWithVersion;
    }

    // 传入ServiceRef, 自动根据里面存放的节点信息封装成 List<WeightObject>.
    private List<WeightRandom.WeightObj<String>> mapToWeightObjs() {
        Collection nodes = serviceRef.getNodes();
        // 准备加权对象搭配负载均衡得到要发送的节点
        List<WeightRandom.WeightObj<String>> weightObjs = new ArrayList<>(nodes.size());
        nodes.forEach((Consumer<String>) node -> {
            // 得到每个node节点字符串中存储的权重
            Double weight = Convert.toDouble(CantorUtil.getUrlBuilder(node).getQuery().get("weight"));
            weightObjs.add(new WeightRandom.WeightObj(node, weight));
        });
        return weightObjs;
    }

    // 执行远程调用(通知Netty, 阻塞Future, 最后返回结果等步骤)
    private Object doRemoteCall(String node, Method method, Object[] args) throws Exception {
        // 根据这个node上的配置, 准备各种参数(节点上配置的优先)
        UrlBuilder urlBuilder = CantorUtil.getUrlBuilder(node);
        UrlQuery query = urlBuilder.getQuery();
        int timeout = 0 == Convert.toInt(query.get("timeout")) ? serviceRef.getTimeout() : Convert.toInt(query.get("timeout"));
//        int retries = 0 == Convert.toInt(query.get("retries")) ? serviceRef.getRetries() : Convert.toInt(query.get("retries"));
        String mock = StrUtil.isEmpty(query.get("mock")) ? serviceRef.getMock() : (String) query.get("mock");
        String host = urlBuilder.getHost();
        Integer port = urlBuilder.getPort();
        // 开始正常远程调用>>>
        // 1. 服务降级:判断是否拥有mock="force:return null"
        if ("force:return null".equals(mock)) {
            // return null;
            return Defaults.defaultValue(method.getReturnType()); // 返回方法类型对应默认值.
        }
        // 2. 服务降级:判断是否有mock="force:return null"且失败过1次
        if ("fail:return null".equals(mock) && mockSet.contains(node)) {
            // return null;
            return Defaults.defaultValue(method.getReturnType()); // 返回方法类型对应默认值.
        }
        // 通知ConsumerNettyKeeper发送数据包, 然后监听一个Future一段时间,超时算失败,如果有fail:return null,记录到set中
        CantorRequestMessage requestMessage = CantorRequestMessage.builder()
                .interfaceName(serviceNameWithVersion)
                .serviceVersion(serviceRef.getVersion())
                .methodName(method.getName())
                .parameterTypes(method.getParameterTypes())
                .parameterValue(args)
                // .returnType(method.getReturnType()) // 弃用
                .build();
        long sequenceId = serviceRef.getCenter().getSequenceId(); // 生成分布式ID
        requestMessage.setSequenceId(sequenceId); // 设置分布式ID
        CompletableFuture future = ConsumerNettyKeeper.sendCantorRequestMessage(host, port, requestMessage); // 发送请求
        Object finalResult = future.get(timeout, TimeUnit.MILLISECONDS); // 等待CompletableFuture
        log.trace("CompletableFuture.get(TIME)得到结果: {}", finalResult);
        return finalResult;
    }

}
